// Copyright 2019 eBay Inc.
// Primary authors: Simon Fell, Diego Ongaro,
//                  Raymond Kroeker, and Sathish Kandasamy.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package exec

import (
	"context"
	"fmt"

	"github.com/ebay/akutan/blog"
	"github.com/ebay/akutan/facts/cache"
	"github.com/ebay/akutan/query/planner/plandef"
	"github.com/ebay/akutan/viewclient/lookups"
	opentracing "github.com/opentracing/opentracing-go"
)

// Execute takes a query as described by the output of the query planner and
// executes it. Results are returned as 1..N ResultChunk's on the provided
// results channel. If the query generates no result rows, a single ResultChunk
// is generated that contains the column info, but no rows.
//
// This function blocks until all results are generated on the results channel
// or there was an error. The results channel is closed when this function
// returns regardless of outcomes. 'events' will receive callbacks during query
// execution, you can pass nil if you don't need them.
func Execute(ctx context.Context, events Events, index uint64, cache cache.FactCache, views lookups.All, query *plandef.Plan, resCh chan<- ResultChunk) error {
	if events == nil {
		events = ignoreEvents{}
	}
	rootOp := buildOperator(events, index, cache, views, query)
	// we always need a root emptyResultOp, which generates an empty
	// ResultChunk with columns in the event the query has no results. This is
	// required to achieve the semantics for the API query service.
	rootOp = &emptyResultOp{rootOp}
	return rootOp.run(ctx, new(defaultBinder), resCh)
}

// queryOperator represents an executable stage of the query. There is typically
// one queryOperator instance for each operator in the query plan (although that
// is not a requirement).
type queryOperator interface {
	// run() executes the query operator, generating its output to the supplied
	// result channel. This blocks until all its values have been produced, or
	// there was an error. In either case the result channel is closed before
	// run returns.
	//
	// run can be asked to perform bulk requests by being passed a valueBinder
	// with a len() > 1. The queryOperator in this case should perform its
	// operator with each binding value. Results include the binding offset in
	// them.
	run(context.Context, valueBinder, chan<- ResultChunk) error

	// All ResultChunk's generated by this queryOperator will have these columns
	// in this order. Note this doesn't use plandef.VarSet as those are sorted
	// and we need to be able to have other orders, e.g. the order of columns
	// from a projection should match the order in the query.
	columns() Columns
}

// buildOperator takes the query plan and builds a parallel tree of queryOperators.
func buildOperator(events Events, index blog.Index, cache cache.FactCache, views lookups.All, query *plandef.Plan) queryOperator {
	inputOps := make([]queryOperator, len(query.Inputs))
	for i, input := range query.Inputs {
		inputOps[i] = buildOperator(events, index, cache, views, input)
	}
	switch op := query.Operator.(type) {
	case *plandef.Ask:
		return &decoratedOp{events, newAsk(op, inputOps)}
	case *plandef.Enumerate:
		return &decoratedOp{events, &enumerateOp{op}}
	case *plandef.ExternalIDs:
		return &decoratedOp{events, newExternalIDs(index, views, op, inputOps)}
	case *plandef.OrderByOp:
		return &decoratedOp{events, newOrderByOp(op, inputOps)}
	case *plandef.LimitAndOffsetOp:
		return &decoratedOp{events, newLimitAndOffsetOp(op, inputOps)}
	case *plandef.DistinctOp:
		return &decoratedOp{events, newDistinctOp(op, inputOps)}
	case *plandef.HashJoin:
		return &decoratedOp{events, newHashJoin(op, inputOps)}
	case *plandef.LoopJoin:
		return &decoratedOp{events, newLoopJoin(op, inputOps)}
	case *plandef.InferPO:
		return &decoratedOp{events, newInferPO(index, views, op, inputOps)}
	case *plandef.InferSP:
		return &decoratedOp{events, newInferSP(index, views, op, inputOps)}
	case *plandef.InferSPO:
		return &decoratedOp{events, newInferSPO(index, views, cache, op, inputOps)}
	case *plandef.LookupPO:
		return &decoratedOp{events, newLookupPO(index, views, op, inputOps)}
	case *plandef.LookupPOCmp:
		return &decoratedOp{events, newLookupPOCmp(index, views, op, inputOps)}
	case *plandef.LookupS:
		return &decoratedOp{events, newLookupS(index, views, op, inputOps)}
	case *plandef.LookupSP:
		return &decoratedOp{events, newLookupSP(index, views, op, inputOps)}
	case *plandef.LookupSPO:
		return &decoratedOp{events, newLookupSPO(index, views, op, inputOps)}
	case *plandef.Projection:
		return &decoratedOp{events, newProjection(op, inputOps)}
	case *plandef.SelectLit:
		return &decoratedOp{events, newSelectLitOp(op, inputOps)}
	case *plandef.SelectVar:
		return &decoratedOp{events, newSelectVarOp(op, inputOps)}
	}
	panic(fmt.Sprintf("Unexpected operator in query executor: %T (%v)", query.Operator, query.Operator))
}

// operator defines an executable operator instance. It sends its output to the
// provided results instance. These will be used with the decoratedOp struct
// to turn them into queryOperators.
type operator interface {
	execute(context.Context, valueBinder, results) error
	operator() plandef.Operator
	// columns returns the list of columns generated by this operator. All
	// ResultChunks generated by this operator will have their column results in
	// this order.
	columns() Columns
}

// results is used by operator implementations to send their output results to.
// Typically resultChunkBuilder is used as an implementation of this.
// Implementations of add should be concurrent safe. The values in rowValues
// should be in the same order the as operators declared columns.
type results interface {
	add(ctx context.Context, offset uint32, f FactSet, rowValues []Value)
	setFinalStatistics(stats FinalStatistics)
}

// decoratedOp uses the supplied operator implementation to implement
// queryOperator. The resulting queryOperator will signal the relevant events,
// and deal with some other common house keeping that every operator needs.
type decoratedOp struct {
	events Events
	op     operator
}

// decoratedOp implements queryOperator
var _ queryOperator = &decoratedOp{}

func (d *decoratedOp) columns() Columns {
	return d.op.columns()
}

func (d *decoratedOp) run(ctx context.Context, binder valueBinder, resCh chan<- ResultChunk) error {
	span, ctx := opentracing.StartSpanFromContext(ctx, d.op.operator().String())
	span.SetTag(bulkCountTag, binder.len())
	clock := d.events.Clock()
	startedAt := clock.Now()

	rb := newChunkBuilder(resCh, d.op.columns())
	err := d.op.execute(ctx, binder, rb)
	outStats := rb.flush(ctx)
	event := OpCompletedEvent{
		Operator:       d.op.operator(),
		InputBulkCount: binder.len(),
		StartedAt:      startedAt,
		EndedAt:        clock.Now(),
		Output:         outStats,
		Err:            err,
	}
	d.events.OpCompleted(event)

	close(resCh)
	span.Finish()
	return err
}
